---
title: Event Schema Design
summary: Best practices for designing and managing event schemas in FlowMart's event-driven architecture
sidebar:
    label: Event Schema Design
    order: 6
---

This guide outlines best practices for designing, evolving, and managing event schemas in FlowMart's event-driven architecture.

## Introduction to Events in Our Architecture

Events are the backbone of FlowMart's microservices ecosystem. They enable:

- **Loose coupling** between services
- **Asynchronous communication**
- **Eventual consistency** across service boundaries
- **Event sourcing** for critical business processes
- **Audit trails** of system changes

## Event Schema Fundamentals

### What Is an Event Schema?

An event schema defines the structure and validation rules for events flowing through our system. Properly designed schemas ensure events can be:

- **Produced** consistently by services 
- **Consumed** reliably by other services
- **Evolved** over time without breaking consumers
- **Validated** to prevent invalid data from propagating
- **Documented** for developers to understand and use

### Event Schema Registry

FlowMart uses a centralized **Schema Registry** to:

1. Store all event schemas
2. Validate events at publish time
3. Provide a browsable catalog of events
4. Track schema versions and compatibility
5. Generate client libraries and documentation

All services must register their event schemas in the central registry before publishing events.

## Schema Design Principles

### 1. Design for Evolution

Events should be designed to evolve over time:

- **Additive Changes Only**: Add optional fields rather than modifying existing ones
- **Required Minimal Core**: Keep required fields to essential business data
- **Meaningful Defaults**: Provide sensible defaults for optional fields
- **Version Awareness**: Include schema version information

### 2. Event Ownership

Each event type has a single owner:

- The **producing service** owns the event schema
- Only the owner can make changes to the schema
- The owner is responsible for schema compatibility

### 3. Semantic Versioning

Follow semantic versioning for event schemas:

- **Major Version**: Breaking changes (consumers must update)
- **Minor Version**: Backward-compatible feature additions
- **Patch Version**: Backward-compatible bug fixes

### 4. Business-Oriented Event Naming

Events should be named using business terminology:

- Use past tense verbs (e.g., `OrderPlaced`, not `CreateOrder`)
- Follow the pattern: `[Entity][Event]` (e.g., `ProductCreated`, `PaymentProcessed`)
- Use domain-specific terminology consistent with our ubiquitous language

## Event Schema Format (JSON Schema)

FlowMart uses JSON Schema as the standard format for defining event schemas:

```json
{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "$id": "https://schemas.flowmart.com/events/product/ProductCreated/1.0.0",
  "title": "ProductCreated",
  "description": "Represents the creation of a new product in the catalog",
  "type": "object",
  "required": ["eventId", "eventType", "eventVersion", "timestamp", "data"],
  "properties": {
    "eventId": {
      "type": "string",
      "format": "uuid",
      "description": "Unique identifier for this event instance"
    },
    "eventType": {
      "type": "string",
      "enum": ["ProductCreated"],
      "description": "Type of the event"
    },
    "eventVersion": {
      "type": "string",
      "pattern": "^\\d+\\.\\d+\\.\\d+$",
      "description": "Semantic version of the event schema"
    },
    "timestamp": {
      "type": "string",
      "format": "date-time",
      "description": "ISO-8601 timestamp when the event was created"
    },
    "source": {
      "type": "string",
      "description": "Service that produced the event"
    },
    "data": {
      "type": "object",
      "required": ["productId", "name", "sku", "price"],
      "properties": {
        "productId": {
          "type": "string",
          "format": "uuid",
          "description": "Unique identifier for the product"
        },
        "name": {
          "type": "string",
          "minLength": 1,
          "maxLength": 255,
          "description": "Name of the product"
        },
        "description": {
          "type": "string",
          "maxLength": 2000,
          "description": "Description of the product"
        },
        "sku": {
          "type": "string",
          "pattern": "^[A-Z0-9-]{5,20}$",
          "description": "Stock keeping unit - unique product identifier"
        },
        "price": {
          "type": "object",
          "required": ["amount", "currency"],
          "properties": {
            "amount": {
              "type": "number",
              "exclusiveMinimum": 0,
              "description": "Price amount"
            },
            "currency": {
              "type": "string",
              "enum": ["USD", "EUR", "GBP", "CAD"],
              "description": "Price currency code"
            }
          }
        },
        "categories": {
          "type": "array",
          "items": {
            "type": "string"
          },
          "description": "Categories the product belongs to"
        },
        "attributes": {
          "type": "object",
          "additionalProperties": {
            "type": ["string", "number", "boolean"]
          },
          "description": "Additional product attributes as key-value pairs"
        }
      }
    },
    "metadata": {
      "type": "object",
      "additionalProperties": true,
      "description": "Additional contextual information about the event"
    },
    "correlationId": {
      "type": "string",
      "format": "uuid",
      "description": "ID for correlating related events"
    },
    "causationId": {
      "type": "string",
      "format": "uuid",
      "description": "ID of the event that caused this event"
    }
  },
  "additionalProperties": false
}
```

## Standard Event Envelope

All FlowMart events follow a standard envelope structure:

```json
{
  "eventId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
  "eventType": "ProductCreated",
  "eventVersion": "1.0.0",
  "timestamp": "2023-09-15T13:25:47.803Z",
  "source": "product-service",
  "data": {
    // Event-specific payload
  },
  "metadata": {
    // Optional contextual information
  },
  "correlationId": "7f8d0e3c-d5f9-42e1-a11b-78ad6c0c380a",
  "causationId": "3e4f5d6c-7b8a-9c0d-1e2f-3a4b5c6d7e8f"
}
```

### Required Envelope Fields

| Field | Type | Description |
|-------|------|-------------|
| eventId | UUID | Unique identifier for the event instance |
| eventType | String | Name of the event (e.g., `ProductCreated`) |
| eventVersion | String | Semantic version of the event schema |
| timestamp | ISO-8601 | When the event occurred |
| data | Object | Event-specific payload |

### Optional Envelope Fields

| Field | Type | Description |
|-------|------|-------------|
| source | String | Service that produced the event |
| metadata | Object | Additional context about the event |
| correlationId | UUID | ID for correlating related events in a flow |
| causationId | UUID | ID of the event that caused this event |

## Event Data Types

### Primitive Types

- **String**: Use for text data
- **Number**: Use for numeric values (integers or decimals)
- **Boolean**: Use for true/false flags
- **Array**: Use for collections of the same type
- **Object**: Use for nested structures

### Specialized Formats

- **UUID**: Use for unique identifiers (format: `uuid`)
- **ISO Date-Time**: Use for timestamps (format: `date-time`)
- **Email**: Use for email addresses (format: `email`)
- **URI**: Use for web addresses (format: `uri`)
- **Decimal**: Use for currency amounts (type: `number`)

### Complex Types

For complex or reusable types, create separate schema definitions:

```json
{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "$id": "https://schemas.flowmart.com/common/Address/1.0.0",
  "title": "Address",
  "type": "object",
  "required": ["line1", "city", "postalCode", "country"],
  "properties": {
    "line1": {
      "type": "string",
      "maxLength": 100
    },
    "line2": {
      "type": "string",
      "maxLength": 100
    },
    "city": {
      "type": "string",
      "maxLength": 100
    },
    "region": {
      "type": "string",
      "maxLength": 100
    },
    "postalCode": {
      "type": "string",
      "maxLength": 20
    },
    "country": {
      "type": "string",
      "maxLength": 2,
      "pattern": "^[A-Z]{2}$"
    }
  }
}
```

Then reference them in your event schemas:

```json
{
  "properties": {
    "shippingAddress": {
      "$ref": "https://schemas.flowmart.com/common/Address/1.0.0"
    }
  }
}
```

## Common Event Patterns

### State Change Events

Represent changes to an entity's state:

```json
{
  "eventType": "OrderStatusChanged",
  "data": {
    "orderId": "61fea0a1-2ac4-4e8c-a851-d38f7c8c06f9",
    "previousStatus": "PAYMENT_PENDING",
    "newStatus": "PAYMENT_COMPLETED",
    "reason": "Payment successful",
    "changedBy": "payment-service"
  }
}
```

### Resource Creation Events

Represent the creation of a new entity:

```json
{
  "eventType": "CustomerCreated",
  "data": {
    "customerId": "cust-12345",
    "email": "john.doe@example.com",
    "firstName": "John",
    "lastName": "Doe",
    "createdAt": "2023-09-15T10:30:00Z"
  }
}
```

### Resource Update Events

Represent updates to an existing entity:

```json
{
  "eventType": "ProductUpdated",
  "data": {
    "productId": "b3c631a5-f7c8-4d89-a57f-dd2f069b5730",
    "changes": {
      "price": {
        "amount": 24.99,
        "currency": "USD"
      },
      "inventory": {
        "inStock": 250
      }
    },
    "updatedAt": "2023-09-15T14:22:36Z"
  }
}
```

### Action Events

Represent business actions that occurred:

```json
{
  "eventType": "PaymentProcessed",
  "data": {
    "paymentId": "pay-67890",
    "orderId": "ord-12345",
    "amount": 99.99,
    "currency": "USD",
    "status": "SUCCESSFUL",
    "paymentMethod": "CREDIT_CARD",
    "processedAt": "2023-09-15T13:45:22Z"
  }
}
```

## Schema Evolution

### Compatibility Types

We support the following compatibility modes for schema evolution:

- **Backward**: New schema can read data produced with previous schema
- **Forward**: Previous schema can read data produced with new schema
- **Full**: Both backward and forward compatibility
- **None**: No compatibility guarantees (use with caution)

### Backward Compatibility Rules

1. **Adding optional fields** is safe
2. **Removing optional fields** is safe
3. **Making a required field optional** is safe
4. **Adding new enum values** is safe
5. **Widening numeric ranges** is safe (e.g., int to float)

### Breaking Changes to Avoid

1. ❌ **Removing required fields**
2. ❌ **Adding required fields**
3. ❌ **Changing field types**
4. ❌ **Renaming fields**
5. ❌ **Restricting enum values**

### Handling Breaking Changes

If you must make a breaking change:

1. Create a new major version of the schema
2. Maintain both versions for a transition period
3. Implement dual publishing for critical events
4. Help consumers migrate to the new version
5. Deprecate the old version with advance notice

## Consuming Events

### Consumer Best Practices

1. **Be tolerant in what you accept**:
   - Ignore unknown fields
   - Provide defaults for missing optional fields
   - Handle enum values gracefully, including unknown values

2. **Validate incoming events**:
   - Verify events against their schema
   - Check required fields
   - Validate business rules before processing

3. **Handle versioning gracefully**:
   - Check event version before processing
   - Implement version-specific handlers if needed
   - Subscribe to schema registry updates

### Consumer Code Example (TypeScript)

```typescript
import { KafkaConsumer } from '@flowmart/kafka-client';
import { SchemaRegistry } from '@flowmart/schema-registry';
import { OrderProcessingService } from './services';

// Initialize schema registry client
const schemaRegistry = new SchemaRegistry({
  baseUrl: 'https://schema-registry.flowmart.com',
});

// Define event interface
interface OrderPlacedEvent {
  eventId: string;
  eventType: 'OrderPlaced';
  eventVersion: string;
  timestamp: string;
  data: {
    orderId: string;
    customerId: string;
    items: Array<{
      productId: string;
      quantity: number;
      unitPrice: number;
    }>;
    totalAmount: number;
    currency: string;
    shippingAddress: {
      // Address fields...
    };
  };
  // Other envelope fields...
}

// Initialize consumer
const orderConsumer = new KafkaConsumer({
  groupId: 'inventory-service',
  brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
});

// Initialize service
const orderProcessor = new OrderProcessingService();

// Start consuming events
async function startConsumer() {
  await orderConsumer.subscribe('order-events');
  
  orderConsumer.on('message', async (message) => {
    try {
      // Parse the message
      const rawEvent = JSON.parse(message.value.toString());
      
      // Skip if not the event we're interested in
      if (rawEvent.eventType !== 'OrderPlaced') {
        return;
      }
      
      // Validate against schema
      const isValid = await schemaRegistry.validate(
        rawEvent, 
        'OrderPlaced', 
        rawEvent.eventVersion
      );
      
      if (!isValid) {
        console.error('Invalid event schema', rawEvent);
        return;
      }
      
      // Type-safe processing
      const event = rawEvent as OrderPlacedEvent;
      
      // Process the order
      await orderProcessor.processNewOrder(event.data);
      
      // Commit the offset
      await orderConsumer.commitOffset(message);
      
    } catch (error) {
      console.error('Error processing order event', error);
      // Implement retry/dead-letter logic
    }
  });
}

startConsumer().catch(console.error);
```

## Publishing Events

### Producer Best Practices

1. **Validate before publishing**:
   - Ensure events comply with their schema
   - Verify business rules and data integrity
   - Set appropriate event headers

2. **Include essential metadata**:
   - Generate a unique event ID
   - Set the correct event type and version
   - Include accurate timestamp
   - Set correlation and causation IDs

3. **Handle publishing failures**:
   - Implement retry mechanisms with backoff
   - Store events temporarily if Kafka is unavailable
   - Log failed events for troubleshooting

### Producer Code Example (TypeScript)

```typescript
import { v4 as uuid } from 'uuid';
import { KafkaProducer } from '@flowmart/kafka-client';
import { SchemaRegistry } from '@flowmart/schema-registry';
import { Product } from './models';

// Initialize schema registry client
const schemaRegistry = new SchemaRegistry({
  baseUrl: 'https://schema-registry.flowmart.com',
});

// Initialize producer
const producer = new KafkaProducer({
  clientId: 'product-service',
  brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
});

export class ProductEventService {
  async publishProductCreated(product: Product, correlationId?: string): Promise<void> {
    const eventId = uuid();
    
    const event = {
      eventId,
      eventType: 'ProductCreated',
      eventVersion: '1.0.0',
      timestamp: new Date().toISOString(),
      source: 'product-service',
      data: {
        productId: product.id,
        name: product.name,
        summary: product.description || null,
        sku: product.sku,
        price: {
          amount: product.price,
          currency: 'USD'  // Default to USD
        },
        categories: product.categories || [],
        attributes: product.attributes || {}
      },
      metadata: {
        // Add any additional metadata
      },
      correlationId: correlationId || eventId,
      causationId: null  // No previous event caused this
    };
    
    // Validate against schema
    const isValid = await schemaRegistry.validate(
      event, 
      'ProductCreated', 
      '1.0.0'
    );
    
    if (!isValid) {
      const errors = await schemaRegistry.getValidationErrors(
        event, 
        'ProductCreated', 
        '1.0.0'
      );
      throw new Error(`Invalid event schema: ${JSON.stringify(errors)}`);
    }
    
    // Publish event
    await producer.send({
      topic: 'product-events',
      messages: [
        {
          key: product.id,
          value: JSON.stringify(event),
          headers: {
            'eventType': 'ProductCreated',
            'contentType': 'application/json'
          }
        }
      ]
    });
    
    console.log(`Published ProductCreated event: ${eventId}`);
  }
}
```

## Schema Registry Integration

### Registering a New Schema

```bash
# Using CLI tool
flowmart-schema register \
  --file ./schemas/ProductCreated.json \
  --compatibility BACKWARD

# API endpoint
curl -X POST https://schema-registry.flowmart.com/subjects/ProductCreated/versions \
  -H "Content-Type: application/json" \
  -d @./schemas/ProductCreated.json
```

### Retrieving a Schema

```javascript
// Using JavaScript client
const schema = await schemaRegistry.getSchema('ProductCreated', '1.0.0');

// API endpoint
curl https://schema-registry.flowmart.com/subjects/ProductCreated/versions/latest
```

### Checking Compatibility

```javascript
// Using JavaScript client
const isCompatible = await schemaRegistry.checkCompatibility(
  newSchema, 
  'ProductCreated'
);

// API endpoint
curl -X POST https://schema-registry.flowmart.com/compatibility/subjects/ProductCreated/versions/latest \
  -H "Content-Type: application/json" \
  -d @./schemas/ProductCreated.v2.json
```

## Schema Registry UI

Our Schema Registry includes a web interface at [https://schema-registry.flowmart.com/ui](https://schema-registry.flowmart.com/ui) that provides:

- Browsable catalog of all event schemas
- Schema versioning history
- Compatibility information
- Schema validation tools
- Documentation generation

## Testing Event Schemas

### Unit Testing Schemas

```javascript
import { validateAgainstSchema } from '@flowmart/schema-validator';
import productCreatedSchema from './schemas/ProductCreated.json';

describe('ProductCreated schema', () => {
  it('validates valid events', () => {
    const validEvent = {
      eventId: 'f47ac10b-58cc-4372-a567-0e02b2c3d479',
      eventType: 'ProductCreated',
      eventVersion: '1.0.0',
      timestamp: '2023-09-15T13:25:47.803Z',
      data: {
        productId: 'b3c631a5-f7c8-4d89-a57f-dd2f069b5730',
        name: 'Smartphone X Pro',
        sku: 'SP-XPRO-2023',
        price: {
          amount: 999.99,
          currency: 'USD'
        }
      }
    };
    
    const result = validateAgainstSchema(validEvent, productCreatedSchema);
    expect(result.valid).toBe(true);
  });
  
  it('rejects events with missing required fields', () => {
    const invalidEvent = {
      eventId: 'f47ac10b-58cc-4372-a567-0e02b2c3d479',
      eventType: 'ProductCreated',
      eventVersion: '1.0.0',
      timestamp: '2023-09-15T13:25:47.803Z',
      data: {
        // Missing required productId
        name: 'Smartphone X Pro',
        // Missing required sku
        price: {
          amount: 999.99,
          currency: 'USD'
        }
      }
    };
    
    const result = validateAgainstSchema(invalidEvent, productCreatedSchema);
    expect(result.valid).toBe(false);
    expect(result.errors.length).toBeGreaterThan(0);
  });
});
```

### Integration Testing with Schema Registry

```javascript
describe('Schema Registry Integration', () => {
  it('registers and validates schema', async () => {
    // Register test schema
    await schemaRegistry.registerSchema(
      'TestEvent',
      testEventSchema,
      'BACKWARD'
    );
    
    // Create test event
    const testEvent = {
      eventId: uuid(),
      eventType: 'TestEvent',
      eventVersion: '1.0.0',
      timestamp: new Date().toISOString(),
      data: {
        // Test data...
      }
    };
    
    // Validate against registered schema
    const isValid = await schemaRegistry.validate(
      testEvent,
      'TestEvent',
      '1.0.0'
    );
    
    expect(isValid).toBe(true);
  });
});
```

## Event Documentation

### Self-Documenting Schemas

Use descriptive fields in your JSON Schema to auto-generate documentation:

```json
{
  "title": "ProductCreated",
  "description": "Published when a new product is created in the catalog",
  "properties": {
    "data": {
      "properties": {
        "productId": {
          "description": "Unique identifier for the product",
          "examples": ["p-12345"]
        }
      }
    }
  }
}
```

### Documentation in Code

Document event handling with clear comments:

```typescript
/**
 * Handles the ProductCreated event
 * This event is triggered when a new product is added to the catalog.
 * It updates the inventory service with the new product information.
 * 
 * @param event The ProductCreated event
 * @see https://schema-registry.flowmart.com/ui/schemas/ProductCreated/1.0.0
 */
async function handleProductCreated(event: ProductCreatedEvent): Promise<void> {
  // Implementation...
}
```

## Event Tracing and Debugging

### Correlation IDs

Use correlation IDs to trace requests across services:

```typescript
// When handling an API request
const correlationId = req.headers['x-correlation-id'] || uuid();

// Include in all events
const event = {
  // Other event fields...
  correlationId,
  // If this event was caused by another event
  causationId: previousEvent?.eventId
};
```

### Event Logging

Log event publishing and consumption with consistent format:

```typescript
// Producer logging
logger.info('Publishing event', {
  eventId: event.eventId,
  eventType: event.eventType,
  correlationId: event.correlationId
});

// Consumer logging
logger.info('Consuming event', {
  eventId: event.eventId,
  eventType: event.eventType,
  correlationId: event.correlationId,
  consumer: 'inventory-service'
});
```

## Event Monitoring

Monitor your event streams using our standard observability stack:

1. **Kafka Metrics**: Lag, throughput, errors
2. **Schema Registry Metrics**: Validation failures, compatibility checks
3. **Service Metrics**: Event processing times, failure rates
4. **Custom Dashboards**: Domain-specific event flows

Access dashboards at [https://grafana.flowmart.com/d/events](https://grafana.flowmart.com/d/events)

## Event Schema Governance

### Change Management

1. **Proposal**: Document the schema change with rationale
2. **Review**: Domain experts review for business requirements
3. **Compatibility Check**: Verify with schema registry
4. **Approval**: Get sign-off from service team leads
5. **Publication**: Register schema and announce change

### Schema Review Checklist

✅ Schema follows naming conventions  
✅ Required fields are truly necessary  
✅ Field types are appropriate  
✅ Enums have complete value lists  
✅ Constraints (min, max, etc.) are appropriate  
✅ Documentation is complete  
✅ Versioning follows semantic versioning  
✅ Compatibility type is specified  

## Conclusion

Well-designed event schemas are foundational to reliable event-driven systems. Following FlowMart's event schema guidelines ensures our services can communicate reliably today and evolve confidently tomorrow.

## Next Steps

- Explore [API design best practices](/docs/guides/creating-new-microservices/api-design)
- Understand [Observability in microservices](/docs/guides/creating-new-microservices/observability)
- Learn about [CI/CD for microservices](/docs/guides/creating-new-microservices/cicd-pipeline) 